001 /* 002 * CondorGPBSDispatcher.java 003 * 004 * Created on June 8, 2004, 11:17 AM 005 * 006 * This file is part of the STAR Scheduler. 007 * Copyright (c) 2002-2003 STAR Collaboration - Brookhaven National Laboratory 008 * 009 * STAR Scheduler is free software; you can redistribute it and/or modify 010 * it under the terms of the GNU General Public License as published by 011 * the Free Software Foundation; either version 2 of the License, or 012 * (at your option) any later version. 013 * 014 * STAR Scheduler is distributed in the hope that it will be useful, 015 * but WITHOUT ANY WARRANTY; without even the implied warranty of 016 * MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 017 * GNU General Public License for more details. 018 * 019 * You should have received a copy of the GNU General Public License 020 * along with STAR Scheduler; if not, write to the Free Software 021 * Foundation, Inc., 59 Temple Place, Suite 330, Boston, MA 02111-1307 USA 022 */ 023 package gov.bnl.star.offline.scheduler.Dispatchers.condorg; 024 025 import gov.bnl.star.offline.scheduler.*; 026 import gov.bnl.star.offline.scheduler.request.Request; 027 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.CSHApplication; 028 import gov.bnl.star.offline.scheduler.Dispatchers.lsf.LSFDispatcher; 029 import gov.bnl.star.offline.scheduler.util.CSHCommandLineTask; 030 import gov.bnl.star.offline.scheduler.util.FilesystemToolkit; 031 //import gov.bnl.star.offline.scheduler.util.StatisticsRecorder; 032 033 import java.io.File; 034 import java.io.FileOutputStream; 035 import java.io.PrintStream; 036 import java.util.*; 037 038 import java.util.logging.Level; 039 import java.util.logging.Logger; 040 041 042 /** Dispatches jobs using Condor-G on a remote site that uses PBS. 043 * It will NOT use extra rsl attributes for PBS. If needed they will 044 * be added later. 045 * @author Alex Withers 046 * @version 1.0 2004/06/08 047 */ 048 public class CondorGPBSDispatcher extends LSFDispatcher { 049 static private Logger log = Logger.getLogger(CondorGPBSDispatcher.class.getName()); 050 051 private static String condorEx; 052 protected CSHApplication application; 053 054 public void setCondorEx(String condorEx) { 055 this.condorEx = condorEx; 056 } 057 058 public String getCondorEx() { 059 return condorEx; 060 } 061 062 /** Creates a new dispatcher */ 063 public CondorGPBSDispatcher() { 064 } 065 066 /** Creates the scripts and dispatches the job on the target machine. 067 * @param request the job request 068 */ 069 public void dispatch(Request request, List jobs) { 070 log.info("Dispatching using Condor-g and LSF: \"" + request.getCommand() + 071 "\""); 072 073 // Enables the simulation mode if necessary 074 useSimulationMode(request.getSimulation()); 075 reportedFailure = false; 076 077 // Submits from the higher to the lower JobID. This way the 078 // user has a feel of when the last job is going to be 079 // submitted 080 for (int nProcess = jobs.size() - 1; nProcess >= 0; 081 nProcess--) { 082 Job job = (Job) jobs.get(nProcess); 083 084 System.out.print("Dispatching process " + 085 job.getJobID() + "."); 086 dispatch(request, job); 087 } 088 089 //StatisticsRecorder.getInstance().recordStatistics(request, jobs); //removed and moved to frame-work 090 } 091 092 protected void dispatch(Request request, Job job) { 093 //application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 094 095 //No long get the CSHApplication object from the config file. Get it using the setApplication() and getApplication() via the config file 096 if(application == null){ //If this was not set in the config file or if we'er in junit testing mode then print an error, and use the default file 097 System.out.println(" >>>>>>>>>>>> CSHapplacation is not is italized."); 098 String notSet = "The CSHApplication for this dispatcher was not set in the config file. If this is a jUnit test this is normal. Finding default \"CSHApplication\" in ComponentLibrary."; 099 log.warning(notSet); 100 System.out.println(notSet); 101 application = (CSHApplication) ComponentLibrary.getInstance().getComponent("CSHApplication"); 102 } 103 104 105 // TODO: all the parameters should be passed in one go 106 application.setJob(request, job); 107 application.setScratchDir(scratchDir); 108 application.setSubmissionCommand(getCondorGCommand(request, job)); 109 110 application.prepareJob(); 111 prepareClassAd(request, job); 112 113 log.info("Executing \"" + getCondorGCommand(request, job) + "\""); 114 115 if (!simulation) { 116 try { 117 Thread.sleep(getMsBtwnSuccess()); 118 } catch (Exception e) { 119 } 120 121 long StarTime = System.currentTimeMillis(); 122 int attempt = 0; 123 boolean success = false; 124 125 while (!success && (attempt < getMaxAttempts())) { 126 try { 127 CSHCommandLineTask task = new CSHCommandLineTask(getCondorGCommand( 128 request, job), true, 30000); 129 task.execute(); 130 131 if (task.getExitStatus() != 0) { 132 log.warning("bsub failed: " + task.getOutput()); 133 Thread.sleep(getMsBtwnFailure()); 134 System.out.print("/"); 135 attempt++; 136 } else { 137 success = true; 138 job.DispatchSuccessful(); 139 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim()); 140 job.setDispatchTime(((int) Math.min(System.currentTimeMillis() - StarTime, java.lang.Integer.MAX_VALUE))); 141 } 142 } catch (Exception e) { 143 log.log(Level.SEVERE, 144 "Couldn't submit the script to Condor-g", e); 145 146 try { 147 Thread.sleep(getMsBtwnFailure()); 148 } catch (Exception e1) { 149 } 150 151 System.out.print("/"); 152 attempt++; 153 } 154 } 155 156 if (success) { 157 System.out.println(" done."); 158 } else { 159 System.out.println(" FAILED!!"); 160 } 161 } else { 162 System.out.println(" simulated."); 163 } 164 } 165 166 /** Returns the command line to submit the job through condor-g. 167 * @param request the request that originated the job 168 * @param job the job to be dispatched 169 * @return the commandline to submit the job 170 */ 171 protected String getCondorGCommand(Request request, Job job) { 172 return condorEx + " " + getClassAdName(request, job); 173 } 174 175 /** Returns the name of the file containing the class ad. Class ad is the job 176 * description required by condor to submit a job. 177 * @param request the request that originated the job 178 * @param job the job to be submitted 179 * @return the file name of the class ad 180 */ 181 protected String getClassAdName(Request request, Job job) { 182 return "sched" + job.getJobID() + ".condorg"; 183 } 184 185 private void prepareClassAd(Request request, Job job) { 186 try { 187 PrintStream classAd = new PrintStream(new FileOutputStream( 188 new File(getClassAdName(request, job)))); 189 createClassAd(request, job, classAd); 190 } catch (Exception e) { 191 log.log(Level.SEVERE, "Couldn't create the class ad", e); 192 throw new RuntimeException("Couldn't create the class ad " + 193 getClassAdName(request, job) + ": " + e.getMessage()); 194 } 195 } 196 197 private void createClassAd(Request request, Job job, 198 PrintStream classAd) { 199 classAd.print("executable = "); 200 classAd.println(getExecutable()); 201 202 if (getArguments() != null) { 203 classAd.print("arguments = "); 204 classAd.println(getArguments()); 205 } 206 207 classAd.print("globusscheduler = "); 208 classAd.println(getGlobusScheduler()); 209 210 if (application.getStdin() != null) { 211 classAd.print("input = "); 212 classAd.println(application.getStdin()); 213 } 214 215 if (application.getStdout() != null) { 216 classAd.print("output = "); 217 classAd.println(application.getStdout()); 218 } 219 220 if (application.getStderr() != null) { 221 classAd.print("error = "); 222 classAd.println(application.getStderr()); 223 } 224 225 classAd.print("log = "); 226 classAd.println(getLogName(job)); 227 228 if (getRemoteDirectory() != null) { 229 classAd.print("remote_initialdir = "); 230 classAd.println(getRemoteDirectory()); 231 } 232 233 /* This is basically the main difference from 234 * CondorGLSFDispatcher.java. No globus-rsl stuff. 235 * -- Alex Withers 236 */ 237 /* 238 classAd.print("globusrsl ="); 239 240 if (job.getTarget() != null) { 241 classAd.print(" (xlsfmachine = "); 242 classAd.print(job.getTarget()); 243 classAd.print(")"); 244 } 245 246 if (application.getJobName() != null) { 247 classAd.print(" (xlsfjobname = "); 248 classAd.print(application.getJobName()); 249 classAd.print(")"); 250 } 251 252 if (request.getMail()) { 253 classAd.print(" (xlsfmailreport = "); 254 classAd.print("false"); 255 classAd.print(")"); 256 } else { 257 classAd.print(" (xlsfmailreport = "); 258 classAd.print("true"); 259 classAd.print(")"); 260 } 261 262 if (getResourceUsageSwitch(job) != null) { 263 classAd.print(" (xlsfresources = "); 264 classAd.print(getResourceUsageSwitch(job)); 265 classAd.print(")"); 266 } 267 268 if (job.getQueue() != null) { 269 classAd.print(" (queue = "); 270 classAd.print(job.getQueue()); 271 classAd.print(")"); 272 } 273 274 classAd.println(); 275 */ 276 277 if (isTransferExecutable()) { 278 classAd.println("transfer_executable = true"); 279 } else { 280 classAd.println("transfer_executable = false"); 281 } 282 classAd.println("notification = never"); 283 classAd.println("universe = globus"); 284 classAd.println("queue"); 285 } 286 287 private String getExecutable() { 288 if (application.getCommandLine().indexOf(' ') == -1) { 289 return application.getCommandLine(); 290 } 291 292 return application.getCommandLine().substring(0, 293 application.getCommandLine().indexOf(' ')); 294 } 295 296 private String getArguments() { 297 if (application.getCommandLine().indexOf(' ') == -1) { 298 return null; 299 } 300 301 return application.getCommandLine().substring(application.getCommandLine() 302 .indexOf(' ') + 303 1); 304 } 305 306 private String getLogName(Job job) { 307 // TODO maybe log filename should be put as a general property of Process (as stds) 308 return "sched" + job.getJobID() + ".condorg.log"; 309 } 310 311 private String getGlobusScheduler() { 312 //TODO make it flexible 313 return getGlobusGatekeeper(); 314 } 315 316 private String gatekeeper; 317 318 /** Holds value of property transferExecutable. */ 319 private boolean transferExecutable; 320 321 public void setGlobusGatekeeper(String gatekeeper) { 322 this.gatekeeper = gatekeeper; 323 } 324 325 public String getGlobusGatekeeper() { 326 return gatekeeper; 327 } 328 329 private String remoteInitialDir; 330 331 public void setRemoteInitialDir(String remoteInitialDir) { 332 this.remoteInitialDir = remoteInitialDir; 333 } 334 335 public String getRemoteInitialDir() { 336 return remoteInitialDir; 337 } 338 339 private String getRemoteDirectory() { 340 // TODO this has to be specified better: remote execution directory could be different from scheduler execution directory 341 if (".".equals(getRemoteInitialDir())) return FilesystemToolkit.getCurrentDirectory(); 342 return getRemoteInitialDir(); 343 } 344 345 protected String getResourceUsageSwitch(Job job) { 346 String res = super.getResourceUsageSwitch(job); 347 if (res == null) return res; 348 349 return res.replaceAll("\"", "\\\\\""); 350 } 351 352 /** Getter for property transferExecutable. 353 * @return Value of property transferExecutable. 354 * 355 */ 356 public boolean isTransferExecutable() { 357 return this.transferExecutable; 358 } 359 360 /** Setter for property transferExecutable. 361 * @param transferExecutable New value of property transferExecutable. 362 * 363 */ 364 public void setTransferExecutable(boolean transferExecutable) { 365 this.transferExecutable = transferExecutable; 366 } 367 368 /** Set the class that writes the sricpt that will be executed by the batch system */ 369 public void setApplication(CSHApplication application){ 370 this.application = application; 371 } 372 373 /** Get the class that writes the sricpt that will be executed by the batch system */ 374 public CSHApplication getApplication(){ 375 return application; 376 } 377 378 379 public void Kill(Request request, List jobs) { 380 //System.out.println("condor kill"); 381 382 for(int z=0; z != jobs.size(); z++){ 383 Job job = (Job)jobs.get(z); 384 385 if(job.getProcesseIDs().size() == 0){ 386 System.out.println("No ProcesseIDs found for job " + job.getJobID()); 387 jobs.remove(z); 388 z--; 389 } 390 else{ 391 for(int i=0; job.getProcesseIDs().size() != i; i++){ 392 393 int attempt = 0; 394 boolean success = false; 395 String commmandOutput = ""; 396 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(i) + "> of Job: <" + job.getJobID() + ">"); 397 398 while (!success && (attempt < getMaxAttempts())) { 399 try { 400 CSHCommandLineTask task = new CSHCommandLineTask("condor_rm " + ((String) job.getProcesseIDs().get(i)) , true, getMaxElapseTime()); 401 task.execute(); 402 if (task.getExitStatus() != 0) { 403 log.warning("condor_rm " + task.getOutput()); 404 Thread.sleep(getMsBtwnFailure()); 405 if(task.getOutput().lastIndexOf("Couldn't find") != -1) success = true; 406 System.out.print(task.getOutput()); 407 attempt++; 408 } 409 else{ 410 success = true; 411 System.out.println("Killed"); 412 } 413 414 commmandOutput = task.getOutput(); 415 } 416 catch (Exception e) { System.out.print("condor_rm failed" + e); 417 System.out.print(commmandOutput); 418 } 419 try { Thread.sleep(getMsBtwnFailure());} 420 catch (Exception e1) {System.out.print("condor_rm failed");} 421 if(!success) System.out.print("/"); 422 attempt++; 423 } 424 425 } 426 job.clearProcesseIDs(); 427 jobs.remove(z); 428 z--; 429 } 430 } 431 } 432 433 public String Status(Job job, int Processe) { 434 if(job.getProcesseIDs().size() == 0) return "No ProcesseIDs found for job " + job.getJobID(); 435 if(job.getProcesseIDs().size() < Processe) return job.getJobID() + " only has " + job.getProcesseIDs().size() + "processes, processe " + Processe + "dose not exist."; 436 437 438 // for(int i=0; job.getProcesseIDs().size() != i; i++){ 439 440 int attempt = 0; 441 boolean success = false; 442 String commmandOutput = ""; 443 System.out.print("ProcesseID: <" + job.getProcesseIDs().get(Processe) + "> of Job: <" + job.getJobID() + ">"); 444 445 while (!success && (attempt < getMaxAttempts())) { 446 try { 447 CSHCommandLineTask task = new CSHCommandLineTask("condor_q " + ((String) job.getProcesseIDs().get(Processe)) , true, getMaxElapseTime()); 448 task.execute(); 449 if (task.getExitStatus() != 0) { 450 log.warning("condor_q " + task.getOutput()); 451 Thread.sleep(getMsBtwnFailure()); 452 453 // if(task.getOutput().lastIndexOf("already finished") != -1) success = true; 454 //return (task.getOutput().replace('\n',' '); 455 attempt++; 456 } 457 else{ 458 success = true; 459 job.DispatchSuccessful(); 460 job.AddProcesseID(task.getOutput().substring(task.getOutput().indexOf("submitted to cluster") + 20, task.getOutput().length()).replace('.', ' ').trim()); 461 462 if(task.getOutput().length() < 217) return("Done or Killed"); 463 else{ 464 String state = task.getOutput().substring(214,216).trim(); 465 if( state.startsWith("R")) state = "RUN"; 466 return(task.getOutput().substring(214,216).trim()); 467 } 468 469 470 } 471 472 commmandOutput = task.getOutput(); 473 } 474 catch (Exception e) { System.out.print("condor_q failed" + e); 475 System.out.print(commmandOutput); 476 } 477 try { Thread.sleep(getMsBtwnFailure());} 478 catch (Exception e1) {System.out.print("condor_q failed");} 479 if(!success) System.out.print("/"); 480 attempt++; 481 } 482 483 // } 484 485 return "condor_q failed"; 486 } 487 488 public void stop() { 489 } 490 491 492 }